from glob import glob
import pandas as pd
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession,DataFrame,functions as F
from pyspark.sql.types import *
from functools import reduce
from rich import print
import plotly.express as px
import plotly.graph_objects as go
import plotly.offline as py
from ipywidgets import widgets
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
py.init_notebook_mode(connected=True)
#lendo a base de Trips
trips_path = glob("data/*.json")
trips_path.sort()
year = 2009
trips=list()
print ("Lendo as bases trips: ")
for file in trips_path:
trip = spark.read.json(file)
trip = trip.withColumn('year', F.lit(F.year('pickup_datetime')))
print ("[bold]Ano: [/bold]"+str(year)+" [bold]Quantidade: [/bold]"+str(trip.count()))
trips.append(trip)
year +=1
#lendo a base Vendor Lookup
vendor = spark.read.csv('data-vendor_lookup-csv.csv', header=True)
#lendo a base Payment Lookup
payment = spark.read.csv('data-payment_lookup-csv.csv', header=True)
Unindo as bases de trips em uma único dataframe
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
trips = unionAll(trips[0], trips[1], trips[2], trips[3])
print ("[bold]Colunas da tabela Trips: [/bold]: ")
print (trips.columns)
print ("[bold]Colunas da tabela Vendor: [/bold]: ")
print (vendor.columns)
print ("[bold]Colunas da tabela Payment: [/bold]: ")
print (payment.columns)
print ("Total da tabela Trips concatenada anos 2009 a 2012: ", trips.count())
print ("Total da tabela Payment: ", payment.count())
print ("Total da tabela Vendor: ", vendor.count())
Verificando os dataframes
trips.limit(5).toPandas()
vendor.toPandas()
payment.limit(5).toPandas()
media = trips.filter(F.col('passenger_count') <=2).select(F.round(F.mean(F.col('trip_distance')),2)).collect()
print ("[bold]A distância média percorrida por viagens com no máximo [red]2[/red] passageiros é: [/bold]", media[0][0])
Verificando a distribuição de tipos de pagamentos na tabela Trips
trips.groupBy(F.col('payment_type')).count().orderBy(F.col("count").desc()).show()
#padronizando a variável payment_type
trips = trips.withColumn("payment_type", F.upper(F.col('payment_type')))
Verificando a distribuição de tipos de pagamentos na tabela Trips após Padronização
trips.groupBy(F.col('payment_type')).count().orderBy(F.col("count").desc()).show()
print ("[bold]Os [red]3[/red] maiores vendors em quantidade total de dinheiro arrecadado são: [/bold]")
name = vendor.select('vendor_id', 'name')
trips.filter(F.col('payment_type') == 'CASH').select(F.col('vendor_id'), F.col('total_amount'))\
.groupBy(F.col('vendor_id')).agg(F.sum("total_amount").alias('total'))\
.join(name, 'vendor_id', "inner").orderBy(F.col('total').desc()).select('name', 'total').limit(3).toPandas()
def histogram(year):
df = trips.filter(F.col('payment_type') == 'CASH').filter(F.col('year') == year)\
.select(F.month(F.col('dropoff_datetime')).alias('month')).orderBy('month').toPandas()
fig = px.histogram(df, x="month", title='Distribuição Mensal de Corridas pagas em Dinheiro em '+str(year))
fig.show()
for y in range(2009, 2013):
histogram(y)
Podemos perceber que sempre nos últimos meses do ano as corridas pagas em dinheiro diminuem
df_month = trips.filter(F.col('year') == 2012).filter(F.month('dropoff_datetime')>=10)\
.select(F.month('dropoff_datetime').alias('month'), F.dayofmonth('dropoff_datetime')\
.alias('day'), F.col('tip_amount'))\
.groupBy(F.col('month'), F.col('day')).agg(F.sum("tip_amount").alias('total_tip'))\
.withColumn('month', F.col('month').cast(StringType())).toPandas()
fig = px.scatter(df_month, x='day', y='total_tip', color='month',
title="Quantidade de Gorjetas por dia nos últimos 3 meses de 2012")
fig.update_layout(
showlegend=True)
fig.show(config=dict(displayModeBar=False))
Como percebemos olhando o gráfico acima no ano de 2012 só teve corrida com gorjeta até o mês de Outubro, por isso vamos olha agora os três últimos meses que teve corrida no ano de 2012, ou seja, Agosto, Setembro e Outubro
df_month = trips.filter(F.col('year') == 2012).filter(F.month('dropoff_datetime')>=8)\
.select(F.month('dropoff_datetime').alias('month'), F.dayofmonth('dropoff_datetime')\
.alias('day'), F.col('tip_amount'))\
.groupBy(F.col('month'), F.col('day')).agg(F.sum("tip_amount").alias('total_tip'))\
.withColumn('month', F.col('month').cast(StringType())).toPandas()
fig = px.scatter(df_month, x='day', y='total_tip', color='month',
title="Quantidade de Gorjetas por dia nos meses de Agosto, Setembro e Outubro de 2012")
fig.update_layout(
showlegend=True)
fig.show(config=dict(displayModeBar=False))
Colocando as variáveis dropoff_datetime e pickup_datetime no formato timestamp
trips = trips.withColumn('dropoff_datetime', F.to_timestamp(F.col('dropoff_datetime').cast(StringType())))\
.withColumn('pickup_datetime', F.to_timestamp(F.col('pickup_datetime').cast(StringType())))
Criando a variável duration que é a diferença do tempo entre as variáveis dropoff_datetime e pickup_datetime
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('dropoff_datetime', format=timeFmt)
- F.unix_timestamp('pickup_datetime', format=timeFmt))
trips = trips.withColumn("duration", timeDiff)
media = trips.filter((F.dayofweek('dropoff_datetime') == 7) | (F.dayofweek('dropoff_datetime') == 1))\
.select(F.round(F.mean(F.col('duration')),2)).collect()
media = round(media[0][0]/60, 2)
print ("[bold]O tempo médio das corridas nos dias de sábado e domingo é: {0} minutos[/bold]".format(media))
Visualizando somente os 100 inicios e finalizações de corridas mais solicitados
df_off = trips.filter(F.col('year') == 2010).groupBy('dropoff_latitude', 'dropoff_longitude').count()\
.orderBy(F.col('count').desc()).filter(F.col('dropoff_latitude') != 0.0).toPandas()
df_off['text'] = "Dropoff: "+(df_off['count']).astype(str)
df_up = trips.filter(F.col('year') == 2010).groupBy('pickup_latitude','pickup_longitude').count()\
.orderBy(F.col('count').desc()).filter(F.col('pickup_latitude') != 0.0).toPandas()
df_up['text'] = "Pickup: "+(df_up['count']).astype(str)
colors = ["darkred", "darkgreen"]
fig = go.Figure()
fig.add_trace(go.Scattergeo(
locationmode = 'USA-states',
lon = df_up['pickup_longitude'].values[:100],
lat = df_up['pickup_latitude'].values[:100],
text = df_up['text'].values[:100],
marker = dict(
size = df_up['count'].values[:100]/2,
color = colors[1],
line_color='#2c3e50',
line_width=0.5,
sizemode = 'area'
),
name = 'Pickup'))
fig.add_trace(go.Scattergeo(
locationmode = 'USA-states',
lon = df_off['dropoff_longitude'].values[:100],
lat = df_off['dropoff_latitude'].values[:100],
text = df_off['text'].values[:100],
marker = dict(
size = df_off['count'].values[:100]/2,
color = colors[0],
line_color='rgb(40,40,40)',
line_width=0.5,
sizemode = 'area'
),
name = 'Dropoff'))
fig.update_layout(
title = '<b>Latitude e longitude de pickups and dropoffs no ano de 2010 (Os 100 mais frequentes)</b>',
titlefont = {'family': 'Arial', 'size': 20},
showlegend = True,
geo = dict(
scope = 'usa',
landcolor = 'rgb(217, 217, 217)',
projection = {'type':'albers usa'},
showland = True,
showlakes = True,
lakecolor = "#3498db",
subunitwidth = 1,
subunitcolor = 'rgb(255, 255, 255)'
)
)
py.iplot(fig)
Simulando um streaming dos dados no formato em json utilizando o Spark Streaming em conjunto com o Kafka e o Flume
A aplicação irá realizar consulta a cada 3 minutos
sc = spark.sparkContext
ssc = StreamingContext(sc, 180)
sc.setLogLevel("WARN")
kvs = KafkaUtils.createDirectStream(ssc, ["taxi_trips"],{"metadata.broker.list": 'localhost:9092'})
stream = kvs.map(lambda x: json.loads(x[1]))
stream.count().map(lambda x: "Qtd de linhas lidas: %s" % x).pprint()
payment_stream = stream.map(lambda trips: trips['payment_type'])
payment_count = payment_stream.countByValue()
payment_count.pprint()
ssc.start()
try:
ssc.awaitTermination()
except KeyboardInterrupt as erro:
print ("[red bold]Finalizado :tada:")